package sinktest

import Source.SourceTest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.{FileSystem, Path}
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

// flink写入文件
object FileSink {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath: String = "E:\\Flinklearn\\Flink\\resources\\data1\\sensor.txt"
    val inputDataStream: DataStream[String] = env.readTextFile(inputPath)

    // 转换成样例类类型
    val ds1: DataStream[SensorReading] = inputDataStream.map(
      data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
      })

    ds1.print()

    // 1.第一种方式 writeAsCsv
//    ds1.wrteAsCsv("E:\\Flinklearn\\Flink\\resources\\data1\\out.txt",FileSystem.WriteMode.OVERWRITE).setParallelism(1)

    // 2.第二种方式 addSink 更加通用 符合flink分布式架构的思想
    ds1.addSink(StreamingFileSink.forRowFormat(
      new Path("E:\\Flinklearn\\Flink\\resources\\data1\\out"),
      new SimpleStringEncoder[SensorReading]()).build()
    ).setParallelism(1)


env.execute("file sink test")
  }
}
